{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# GeoPandas\n",
    "\n",
    "GeoPandas is an open source project to make working with geospatial data in python easier. GeoPandas extends the datatypes used by pandas to allow spatial operations on geometric types. Geometric operations are performed by shapely. Geopandas further depends on fiona for file access and descartes and matplotlib for plotting.\n",
    "\n",
    "The goal of GeoPandas is to make working with geospatial data in python easier. It combines the capabilities of pandas and shapely, providing geospatial operations in pandas and a high-level interface to multiple geometries to shapely. GeoPandas enables you to easily do operations in python that would otherwise require a spatial database such as PostGIS.\n",
    "\n",
    "## OGC SimpleFeatures\n",
    "\n",
    "A standard that specifies a common storage and access model of mostly two-dimensional geographical data (point, line, polygon, multi-point, multi-line, etc.) The formats were originally defined by the Open Geospatial Consortium (OGC) and described in their Simple Feature Access and Coordinate Transformation Service specifications. \n",
    "\n",
    "The standard defines a model for two-dimensional simple features, with linear interpolation between vertices. The data model defined is a hierarchy of classes. This part also defines representation using Well-Known Text (and Binary).\n",
    "\n",
    "### WKT/WKB: Well Known Text and Well Known Binary\n",
    "\n",
    "Well-known text (WKT) is a text markup language for representing vector geometry objects on a map, spatial reference systems of spatial objects and transformations between spatial reference systems. A binary equivalent, known as well-known binary (WKB), is used to transfer and store the same information on databases, such as PostGIS, Microsoft SQL Server and DB2. The formats were originally defined by the Open Geospatial Consortium (OGC) and described in their Simple Feature Access and Coordinate Transformation Service specifications.\n",
    "\n",
    "\n",
    "### Implications for PySpark data de/serialization & un/marshalling\n",
    "\n",
    "When using pyspark we want have to send data back and forth between master node and the workers which run jobs on the JVM. In order to simplify this rather than sending Python or more precisely Shapely objects we will use WKT. It works with one the libraries I am using today to perform Spatial Joins.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/imports.py'\n",
    "import notebook\n",
    "import os.path, json, io, pandas\n",
    "import matplotlib.pyplot as plt\n",
    "import matplotlib\n",
    "%pylab inline\n",
    "pylab.rcParams['figure.figsize'] = (54, 60)\n",
    "\n",
    "matplotlib.style.use('ggplot')\n",
    "\n",
    "from retrying import retry # for exponential back down when calling TurboOverdrive API\n",
    "\n",
    "import pyspark.sql.functions as func # resuse as func.coalace for example\n",
    "from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType\n",
    "\n",
    "import pandas as pandas\n",
    "from geopandas import GeoDataFrame # Loading boundaries Data\n",
    "from shapely.geometry import Point, Polygon, shape # creating geospatial data\n",
    "from shapely import wkb, wkt # creating and parsing geospatial data\n",
    "import overpy # OpenStreetMap API\n",
    "\n",
    "from ast import literal_eval as make_tuple # used to decode data from java\n",
    "\n",
    "# make sure nbextensions are installed\n",
    "notebook.nbextensions.check_nbextension('usability/codefolding', user=True)\n",
    "\n",
    "try:\n",
    "    sc\n",
    "except NameError:\n",
    "    import pyspark\n",
    "    sc = pyspark.SparkContext('local[*]')\n",
    "    sqlContext = pyspark.sql.SQLContext(sc)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/osm2geojson.py'\n",
    "### Helper functions\n",
    "\n",
    "# given shapely bounds return bbox compatiable with overpass turbo openstreetmap API\n",
    "def bbox(bounds):\n",
    "    return (bounds[1],bounds[0],bounds[3],bounds[2])\n",
    "\n",
    "# given an openstreetmap node retrun a GeoJSON feature\n",
    "def nodeToFeature(node):\n",
    "    properties = node.tags\n",
    "    properties['wkt'] = Point(node.lon, node.lat).wkt\n",
    "    return {\n",
    "        \"type\": \"Feature\",\n",
    "        \"properties\": properties,\n",
    "        \"geometry\": {\n",
    "            \"type\": \"Point\",\n",
    "            \"coordinates\": [\n",
    "                float(node.lon),\n",
    "                float(node.lat)\n",
    "            ]\n",
    "        }\n",
    "    }\n",
    "\n",
    "# given an array of nodes return an array of GeoJSON features\n",
    "def nodesToFeatures(nodes):\n",
    "    \"\"\"\n",
    "    :param nodes\n",
    "    :type nodes from overpy.Result (result.nodes)\n",
    "    :return:\n",
    "    \"\"\"\n",
    "    features = []\n",
    "    for node in nodes:\n",
    "        features.append(nodeToFeature(node))\n",
    "    return features\n",
    "\n",
    "def waysToFeatures(ways):\n",
    "    print ways\n",
    "    features = []\n",
    "    return features\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/query_overpass_api.py'\n",
    "# method to handle OverpassTooManyRequests exception from OpenStreetMap/overpass turbo API\n",
    "def retry_if_overpass_too_many_requests(exception):\n",
    "    return isinstance(exception, overpy.exception.OverpassTooManyRequests)\n",
    "\n",
    "# decorator to retry with exponential back off\n",
    "@retry(wait_exponential_multiplier=2000,\n",
    "       wait_exponential_max=60000,\n",
    "       retry_on_exception=retry_if_overpass_too_many_requests)\n",
    "def call_overpass_api(q):\n",
    "    return OVERPASS_API.query(q)\n",
    "\n",
    "def run_overpass_api(bounding_geo_df):\n",
    "    local_pois = []\n",
    "    for index, row in bounding_geo_df.iterrows():\n",
    "        # For documentation see:\n",
    "        # http://wiki.openstreetmap.org/wiki/Tag:{key}={value}\n",
    "        # e.g: http://wiki.openstreetmap.org/wiki/Tag:amenity=theatre\n",
    "        payload = \"\"\"\n",
    "            [out:json][timeout:60];\n",
    "            (\n",
    "              node[\"tourism\"=\"gallery\"]%(box)s;\n",
    "              node[\"tourism\"=\"artwork\"]%(box)s;\n",
    "              node[\"tourism\"=\"museum\"]%(box)s;\n",
    "              node[\"amenity\"=\"arts_centre\"]%(box)s;\n",
    "              node[\"amenity\"=\"theatre\"]%(box)s;\n",
    "            );\n",
    "            out body;\"\"\" % {'box': str(bbox(row.geometry.bounds))}\n",
    "        result = call_overpass_api(payload)\n",
    "        local_pois.extend(nodesToFeatures(result.nodes))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "OVERPASS_API         = overpy.Overpass()\n",
    "BASE_DIR             = os.path.join(os.path.abspath('.'), 'work-flow')\n",
    "URBAN_BOUNDARIES_FILE = '06_Europe_Cities_Boundaries_with_Labels_Population.geo.json'\n",
    "\n",
    "# Paths to base datasets that we are using:\n",
    "URBAN_BOUNDARIES_PATH = os.path.join(BASE_DIR,URBAN_BOUNDARIES_FILE)\n",
    "POIS_PATH            = os.path.join(BASE_DIR, \"pois.json\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Load the boundaries data\n",
    "geo_df = GeoDataFrame.from_file(URBAN_BOUNDARIES_PATH)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Fetch the the POIs but not now\n",
    "overwritePois = False # <= DON'T Set to True during workshop!        \n",
    "if overwritePois or not os.path.isfile(POIS_PATH):\n",
    "    # Write POIs file\n",
    "    pois = run_overpass_api(geo_df)\n",
    "    with io.open(POIS_PATH, 'w+', encoding='utf-8') as f:\n",
    "        f.write(unicode(json.dumps(pois, ensure_ascii= False)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "For the exercise below consult GeoPandas documentation http://geopandas.org/user.html\n",
    "\n",
    "## Exercise: \n",
    "\n",
    "* find out what the schema for the data is?\n",
    "* plot the geometries?\n",
    "* print out the wkt version of the geometries\n",
    "* find out the Spatial Reference System (CRS/SRS) for the GeoDataframe?\n",
    "* change the reference system to a cartesian system that is suitable for calculating areas\n",
    "* calculate the total area of the cities we are using HINT: Europe using this CRS: http://spatialreference.org/ref/epsg/3035/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Answer\n",
    "\n",
    "# * find out what the schema for the data is?\n",
    "\n",
    "# * plot the geometries?\n",
    "\n",
    "# * print out the wkt version of the geometries\n",
    "\n",
    "# * find out the Spatial Reference System (CRS/SRS) for the GeoDataframe?\n",
    "# A note about CRS's and Geospatial Data.\n",
    "# http://spatialreference.org/ref/epsg/etrs89-etrs-laea/\n",
    "\n",
    "# * change the reference system to a cartesian system that is suitable for calculating areas\n",
    "# * calculate the total area of the cities we are using \n",
    "#   HINT: Europe using this CRS: http://spatialreference.org/ref/epsg/3035/\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "## Solution\n",
    "\n",
    "# * find out what the schema for the data is?\n",
    "geo_df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# * plot the geometries?\n",
    "geo_df.plot()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# * print out the wkt version of the geometries\n",
    "wkts = map(lambda g: g.to_wkt() , geo_df.geometry)\n",
    "wkts[0]\n",
    "type(geo_df.geometry)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# * find out the Spatial Reference System (CRS/SRS) for the GeoDataframe?\n",
    "# A note about CRS's and Geospatial Data.\n",
    "# http://spatialreference.org/ref/epsg/etrs89-etrs-laea/\n",
    "geo_df.crs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# * change the reference system to a cartesian system that is suitable for calculating areas\n",
    "# * calculate the total area of the cities we are using \n",
    "#   HINT: Europe using this CRS: http://spatialreference.org/ref/epsg/3035/\n",
    "geo_df_cartesian = geo_df.to_crs(epsg=3035)\n",
    "print geo_df_cartesian.crs\n",
    "print geo_df.crs\n",
    "geo_df_cartesian.area.sum()/1000000"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Preparing our data for Spatial Spark\n",
    "\n",
    "The way I have written spatial spark is that it expects a geospatial column as a WKT string. Internally it uses this to create OGC Geometries via Java Topology Suite (JTS). So in order to use Spatial Spark we will add the WKT column to our data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Add a WKT column for use later\n",
    "geo_df['wkt'] = pandas.Series(\n",
    "    map(lambda geom: str(geom.to_wkt()), geo_df['geometry']), \n",
    "    index=geo_df.index, dtype='string')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "geo_df['wkt']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a pyspark dataframe from GeoPanadas\n",
    "\n",
    "Spark Dataframes are an abstraction over RDDs. Lets look at what happens when we call pyspark transform and action. \n",
    "\n",
    "**SLIDES Switch: Spark Internals**\n",
    "\n",
    "![](images/pyspark.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame\n",
    "# Creates a DataFrame from an RDD of tuple/list, list or pandas.DataFrame.\n",
    "\n",
    "# When schema is a list of column names, the type of each column will be inferred from data.\n",
    "\n",
    "# When schema is None, it will try to infer the schema (column names and types) from data, which should\n",
    "# be an RDD of Row, or namedtuple, or dict.\n",
    "\n",
    "# If schema inference is needed, samplingRatio is used to determined the ratio of rows used for schema inference.\n",
    "# The first row will be used if samplingRatio is None.\n",
    "\n",
    "# Parameters:\n",
    "# data – an RDD of Row/tuple/list/dict, list, or pandas.DataFrame.\n",
    "# schema – a StructType or list of column names. default None.\n",
    "# samplingRatio – the sample ratio of rows used for inferring\n",
    "# Returns:\n",
    "# DataFrame\n",
    "\n",
    "boundaries_from_pd = sqlContext.createDataFrame(geo_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Lets find Berlin\n",
    "berlin_rdd = boundaries_from_pd.filter(\n",
    "    boundaries_from_pd['NAMEASCII'] == 'Berlin'\n",
    ")\n",
    "wkt.loads(berlin_rdd.take(1)[0].wkt)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise \n",
    "\n",
    "* Look at the schema of the boundaries_from_pd\n",
    "* Graph the name of the city and its population sorted"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Answer 10 min\n",
    "\n",
    "# 1. print schema\n",
    "\n",
    "# 2. Print of cities sorted by its population\n",
    "# Hint you can go from spark data fram to Pandas data frame and plot that using the pandas plot function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# -------------------------- Don't look Below untill you have tried the Exerise ----------------------------- #\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "# Solution:\n",
    "boundaries_from_pd.printSchema()\n",
    "\n",
    "\n",
    "df = boundaries_from_pd.select(\n",
    "    boundaries_from_pd['NAMEASCII'], \n",
    "    boundaries_from_pd['POPEU2013'].cast(IntegerType())\n",
    " )\n",
    "\n",
    "df = df.sort(df['POPEU2013'].desc())\n",
    "df.show()\n",
    "\n",
    "# # SQL Version\n",
    "boundaries_from_pd.registerTempTable('boundaries')\n",
    "sqlContext.sql(\"SELECT NAME, POPEU2013 FROM boundaries\").show()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df = df.toPandas()\n",
    "df.plot.bar(x='NAMEASCII')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Lets look at the POI data next\n",
    "\n",
    "Our POI data comes from Openstreetmap and its stored as lines of GeoJSON point features. This format is perfect for working with Apache Spark. Spark loads the data from each row and infers the schema as well. This data could be stored on Amazon S3 or Hadoop File System (HDFS) allowing for relative huge datasets. Even though in our example we are reading the JSON from file we can just as easily point it to HDFS."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pois_df = sqlContext.read.json(POIS_PATH)\n",
    "\n",
    "print pois_df.count()\n",
    "rec = pois_df.take(1)[0]\n",
    "print rec"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Adding WKT attribute\n",
    "\n",
    "Similar to the data for boundaries we want to add WKT column. Notice how we can go from Spark Dataframe to Pandas dataframes and back seamlessly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def toWktColumn(coords):\n",
    "    return (Point(coords).wkt)\n",
    "\n",
    "pois_df = pois_df.toPandas()\n",
    "pois_df['wkt'] = pandas.Series(\n",
    "    map(lambda geom: toWktColumn(geom.coordinates), pois_df['geometry']), \n",
    "    index=pois_df.index, dtype='string')\n",
    "\n",
    "pois_df = sqlContext.createDataFrame(pois_df)\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pois_df.columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Add HiveContext\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import HiveContext\n",
    "sqlContext = HiveContext(sc)\n",
    "# sqlContext.setConf(\"hive.metastore.warehouse.dir\", os.getcwd()+\"/hive\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "sqlContext.sql(\"CREATE TEMPORARY FUNCTION OSMImportNodes AS 'info.pavie.osm2hive.controller.HiveNodeImporter'\")\n",
    "sqlContext.sql(\"CREATE TEMPORARY FUNCTION OSMImportWays AS 'info.pavie.osm2hive.controller.HiveWayImporter'\")\n",
    "sqlContext.sql(\"CREATE TEMPORARY FUNCTION OSMImportRelations AS 'info.pavie.osm2hive.controller.HiveRelationImporter'\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# sqlContext.sql(\"CREATE TABLE osmdata(osm_content STRING) STORED AS TEXTFILE\")\n",
    "\n",
    "# sqlContext.getConf(\"hive.metastore.warehouse.dir\")\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.9"
  },
  "toc": {
   "toc_cell": false,
   "toc_number_sections": true,
   "toc_threshold": 6,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
